defmodule Realtime.Tenants.Migrations.FixWalrusRoleHandling do
  @moduledoc false
  use Ecto.Migration

  def change do
    execute """
    create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)
    returns setof realtime.wal_rls
    language plpgsql
    volatile
    as $$
    declare
    -- Regclass of the table e.g. public.notes
    entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;

    -- I, U, D, T: insert, update ...
    action realtime.action = (
        case wal ->> 'action'
            when 'I' then 'INSERT'
            when 'U' then 'UPDATE'
            when 'D' then 'DELETE'
            else 'ERROR'
        end
    );

    -- Is row level security enabled for the table
    is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;

    subscriptions realtime.subscription[] = array_agg(subs)
        from
            realtime.subscription subs
        where
            subs.entity = entity_;

    -- Subscription vars
    roles regrole[] = array_agg(distinct us.claims_role::text)
        from
            unnest(subscriptions) us;

    working_role regrole;
    claimed_role regrole;
    claims jsonb;

    subscription_id uuid;
    subscription_has_access bool;
    visible_to_subscription_ids uuid[] = '{}';

    -- structured info for wal's columns
    columns realtime.wal_column[];
    -- previous identity values for update/delete
    old_columns realtime.wal_column[];

    error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;

    -- Primary jsonb output for record
    output jsonb;

    begin
    perform set_config('role', null, true);

    columns =
        array_agg(
            (
                x->>'name',
                x->>'type',
                x->>'typeoid',
                realtime.cast(
                    (x->'value') #>> '{}',
                    coalesce(
                        (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4
                        (x->>'type')::regtype
                    )
                ),
                (pks ->> 'name') is not null,
                true
            )::realtime.wal_column
        )
        from
            jsonb_array_elements(wal -> 'columns') x
            left join jsonb_array_elements(wal -> 'pk') pks
                on (x ->> 'name') = (pks ->> 'name');

    old_columns =
        array_agg(
            (
                x->>'name',
                x->>'type',
                x->>'typeoid',
                realtime.cast(
                    (x->'value') #>> '{}',
                    coalesce(
                        (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4
                        (x->>'type')::regtype
                    )
                ),
                (pks ->> 'name') is not null,
                true
            )::realtime.wal_column
        )
        from
            jsonb_array_elements(wal -> 'identity') x
            left join jsonb_array_elements(wal -> 'pk') pks
                on (x ->> 'name') = (pks ->> 'name');

    for working_role in select * from unnest(roles) loop

        -- Update `is_selectable` for columns and old_columns
        columns =
            array_agg(
                (
                    c.name,
                    c.type_name,
                    c.type_oid,
                    c.value,
                    c.is_pkey,
                    pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')
                )::realtime.wal_column
            )
            from
                unnest(columns) c;

        old_columns =
                array_agg(
                    (
                        c.name,
                        c.type_name,
                        c.type_oid,
                        c.value,
                        c.is_pkey,
                        pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')
                    )::realtime.wal_column
                )
                from
                    unnest(old_columns) c;

        if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then
            return next (
                jsonb_build_object(
                    'schema', wal ->> 'schema',
                    'table', wal ->> 'table',
                    'type', action
                ),
                is_rls_enabled,
                -- subscriptions is already filtered by entity
                (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),
                array['Error 400: Bad Request, no primary key']
            )::realtime.wal_rls;

        -- The claims role does not have SELECT permission to the primary key of entity
        elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then
            return next (
                jsonb_build_object(
                    'schema', wal ->> 'schema',
                    'table', wal ->> 'table',
                    'type', action
                ),
                is_rls_enabled,
                (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),
                array['Error 401: Unauthorized']
            )::realtime.wal_rls;

        else
            output = jsonb_build_object(
                'schema', wal ->> 'schema',
                'table', wal ->> 'table',
                'type', action,
                'commit_timestamp', to_char(
                    ((wal ->> 'timestamp')::timestamptz at time zone 'utc'),
                    'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'
                ),
                'columns', (
                    select
                        jsonb_agg(
                            jsonb_build_object(
                                'name', pa.attname,
                                'type', pt.typname
                            )
                            order by pa.attnum asc
                        )
                    from
                        pg_attribute pa
                        join pg_type pt
                            on pa.atttypid = pt.oid
                    where
                        attrelid = entity_
                        and attnum > 0
                        and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT')
                )
            )
            -- Add "record" key for insert and update
            || case
                when action in ('INSERT', 'UPDATE') then
                    jsonb_build_object(
                        'record',
                        (
                            select
                                jsonb_object_agg(
                                    -- if unchanged toast, get column name and value from old record
                                    coalesce((c).name, (oc).name),
                                    case
                                        when (c).name is null then (oc).value
                                        else (c).value
                                    end
                                )
                            from
                                unnest(columns) c
                                full outer join unnest(old_columns) oc
                                    on (c).name = (oc).name
                            where
                                coalesce((c).is_selectable, (oc).is_selectable)
                                and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64))
                        )
                    )
                else '{}'::jsonb
            end
            -- Add "old_record" key for update and delete
            || case
                when action = 'UPDATE' then
                    jsonb_build_object(
                            'old_record',
                            (
                                select jsonb_object_agg((c).name, (c).value)
                                from unnest(old_columns) c
                                where
                                    (c).is_selectable
                                    and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64))
                            )
                        )
                when action = 'DELETE' then
                    jsonb_build_object(
                        'old_record',
                        (
                            select jsonb_object_agg((c).name, (c).value)
                            from unnest(old_columns) c
                            where
                                (c).is_selectable
                                and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64))
                                and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey
                        )
                    )
                else '{}'::jsonb
            end;

            -- Create the prepared statement
            if is_rls_enabled and action <> 'DELETE' then
                if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then
                    deallocate walrus_rls_stmt;
                end if;
                execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns);
            end if;

            visible_to_subscription_ids = '{}';

            for subscription_id, claims in (
                    select
                        subs.subscription_id,
                        subs.claims
                    from
                        unnest(subscriptions) subs
                    where
                        subs.entity = entity_
                        and subs.claims_role = working_role
                        and (
                            realtime.is_visible_through_filters(columns, subs.filters)
                            or action = 'DELETE'
                        )
            ) loop

                if not is_rls_enabled or action = 'DELETE' then
                    visible_to_subscription_ids = visible_to_subscription_ids || subscription_id;
                else
                    -- Check if RLS allows the role to see the record
                    perform
                        -- Trim leading and trailing quotes from working_role because set_config
                        -- doesn't recognize the role as valid if they are included
                        set_config('role', trim(both '"' from working_role::text), true),
                        set_config('request.jwt.claims', claims::text, true);

                    execute 'execute walrus_rls_stmt' into subscription_has_access;

                    if subscription_has_access then
                        visible_to_subscription_ids = visible_to_subscription_ids || subscription_id;
                    end if;
                end if;
            end loop;

            perform set_config('role', null, true);

            return next (
                output,
                is_rls_enabled,
                visible_to_subscription_ids,
                case
                    when error_record_exceeds_max_size then array['Error 413: Payload Too Large']
                    else '{}'
                end
            )::realtime.wal_rls;

        end if;
    end loop;

    perform set_config('role', null, true);
    end;
    $$;
    """
  end
end
